package com.atguigu.realtime.app

import java.util

import com.alibaba.fastjson.JSON
import com.atguigu.gmall.common.Constant
import com.atguigu.realtime.bean.{AlertInfo, EventLog}
import com.atguigu.realtime.util.ESUtil
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

import scala.util.control.Breaks._
import ESUtil._

/**
 * Author atguigu
 * Date 2020/10/19 14:51
 */
object AlertApp extends BaseApp {
    override val appName: String = "AlertApp"
    override val master: String = "local[2]"
    override val batchTime: Int = 3
    override val groupId: String = "AlertApp"
    override val topic: String = Constant.EVENT_TOPIC
    
    override def run(ssc: StreamingContext, sourceStream: DStream[String]): Unit = {
        sourceStream
            .window(Minutes(5), Seconds(6))
            .map(json => {
                val eventLog = JSON.parseObject(json, classOf[EventLog])
                (eventLog.mid, eventLog)
            })
            .groupByKey()
            .map {
                case (mid, it: Iterable[EventLog]) =>
                    // 1. 统计领取优惠券用户的个数量
                    val uids: util.HashSet[String] = new java.util.HashSet[String]()
                    // 2. 当前设备所有的行为
                    val events: util.ArrayList[String] = new java.util.ArrayList[String]()
                    // 3. 优惠券对应的商品id
                    val items: util.HashSet[String] = new java.util.HashSet[String]()
                    
                    // 4. 标记是否浏览过商品. 默认没有
                    var isBrowser = false
                    breakable {
                        it.foreach(eventLog => {
                            // 保存所有的时间
                            events.add(eventLog.eventId)
                            eventLog.eventId match {
                                case "coupon" =>
                                    uids.add(eventLog.uid) // 把领取优惠券的用户的id存起来
                                    items.add(eventLog.itemId) // 存储优惠券对应的商品id
                                case "clickItem" =>
                                    isBrowser = true
                                    break
                                case _ =>
                            }
                        })
                    }
                    //(是否预警 true/false, 预警信息的封装)
                    (!isBrowser && uids.size() >= 3,
                        AlertInfo(mid, uids, items, events, System.currentTimeMillis()))
            }
            .filter(_._1)
            .map(_._2)
            .foreachRDD(rdd => {
                println("xxxx")
                // 把 预警写入到es
                /*rdd.foreachPartition((it: Iterator[AlertInfo]) => {
                    ESUtil.insertBulk(
                        "gmall_coupon_alert",
                        it.map(info => (info.mid + ":" + info.ts / 1000 / 60, info)))
                })*/
                rdd.saveToES("gmall_coupon_alert")
            })
    }
}

/*
什么样的行为需要预警:

需求：
同一设备，5分钟内三次及以上用不同账号登录并领取优惠劵，
并且在登录到领劵过程中没有浏览商品。同时达到以上要求则产生一条预警日志。
同一设备，每分钟只记录一次预警。

分析:
同一设备   按照设备id分组
5分钟内    window    窗口长度: 5分钟  滑动步长: 6s

三次及以上用不同账号登录并领取优惠劵
    统计领取优惠券的用户的个数
并且在登录到领劵过程中没有浏览商品
    没有浏览商品: 5分种内没有出现浏览商品的行为
    
es处理:
    同一设备，每分钟只记录一次预警。
        spark-Streaming不负责, 交给es来完成



 */
